{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "c_Buqnnxn0L7"
   },
   "source": [
    "<center>\n",
    "    <p style=\"text-align:center\">\n",
    "        <img alt=\"phoenix logo\" src=\"https://storage.googleapis.com/arize-phoenix-assets/assets/phoenix-logo-light.svg\" width=\"200\"/>\n",
    "        <br>\n",
    "        <a href=\"https://arize.com/docs/phoenix/\">Docs</a>\n",
    "        |\n",
    "        <a href=\"https://github.com/Arize-ai/phoenix\">GitHub</a>\n",
    "        |\n",
    "        <a href=\"https://arize-ai.slack.com/join/shared_invite/zt-2w57bhem8-hq24MB6u7yE_ZF_ilOYSBw#/shared-invite/email\">Community</a>\n",
    "    </p>\n",
    "</center>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "tTuMwH8Qg3kn"
   },
   "source": [
    "# LangGraph Agents: Orchestrator–Worker Pattern\n",
    "\n",
    "In this tutorial, we’ll build a multi-agent system using LangGraph's **Orchestrator–Worker pattern**, ideal for dynamically decomposing a task into subtasks, assigning them to specialized LLM agents, and synthesizing their responses.\n",
    "\n",
    "This pattern is particularly well-suited when the structure of subtasks is unknown ahead of time—such as when writing modular code, creating multi-section reports, or conducting research. The **orchestrator** plans and delegates, while the **workers** each complete their assigned section.\n",
    "\n",
    "We’ll also use **Phoenix** to trace and debug the orchestration process. With Phoenix, you can visually inspect which tasks the orchestrator generated, how each worker handled its section, and how the final output was assembled.\n",
    "\n",
    "By the end of this notebook, you’ll learn how to:\n",
    "- Use structured outputs to plan subtasks dynamically.\n",
    "- Assign subtasks to LLM workers via LangGraph's `Send` API.\n",
    "- Collect and synthesize multi-step LLM outputs.\n",
    "- Trace and visualize orchestration using Phoenix.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install langgraph langchain langchain_community \"arize-phoenix==9.0.1\" arize-phoenix-otel openinference-instrumentation-langchain"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install langchain_openai"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "from getpass import getpass\n",
    "\n",
    "from langgraph.graph import END, START, StateGraph"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "os.environ[\"OPENAI_API_KEY\"] = getpass(\"🔑 Enter your OpenAI API key: \")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "CaQuJO1HhQmH"
   },
   "source": [
    "# Configure Phoenix Tracing\n",
    "\n",
    "Make sure you go to https://app.phoenix.arize.com/ and generate an API key. This will allow you to trace your Langgraph application with Phoenix."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if \"PHOENIX_API_KEY\" not in os.environ:\n",
    "    os.environ[\"PHOENIX_API_KEY\"] = getpass(\"🔑 Enter your Phoenix API key: \")\n",
    "\n",
    "if \"PHOENIX_COLLECTOR_ENDPOINT\" not in os.environ:\n",
    "    os.environ[\"PHOENIX_COLLECTOR_ENDPOINT\"] = getpass(\"🔑 Enter your Phoenix Collector Endpoint\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from phoenix.otel import register\n",
    "\n",
    "tracer_provider = register(project_name=\"Orchestrator\", auto_instrument=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "dATeDmFbhcWY"
   },
   "source": [
    "Orchestrator‑Workers • Research‑Paper Generator\n",
    "----------------------------------------------\n",
    "The orchestrator plans research‑paper *subsections* (abstract, background …),\n",
    "spawns one worker per subsection, then stitches everything into a full draft."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import operator\n",
    "from typing import Annotated, List, TypedDict\n",
    "\n",
    "from IPython.display import Markdown\n",
    "from langchain_core.messages import HumanMessage, SystemMessage"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "piT0RAtclEW-"
   },
   "source": [
    "# Step 1: Defining the Planning Schema\n",
    "To begin, we define a structured output schema using Pydantic. This schema ensures that the LLM returns well-formatted, predictable output when tasked with planning the structure of a research paper.\n",
    "\n",
    "We create two models:\n",
    "\n",
    "Subsection: Represents a single unit of the paper, including its name and a brief description of what it should cover.\n",
    "\n",
    "Subsections: A wrapper that holds a list of these units.\n",
    "\n",
    "By using these models with LangGraph’s with_structured_output feature, we enforce that the orchestrator LLM returns an organized plan — rather than freeform text — that downstream nodes (worker LLMs) can reliably use.\n",
    "\n",
    "This schema acts as the blueprint for the rest of the workflow."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from langchain_core.pydantic_v1 import BaseModel, Field\n",
    "from langgraph.constants import Send\n",
    "\n",
    "\n",
    "class Subsection(BaseModel):\n",
    "    name: str = Field(description=\"Name for this subsection of the research paper.\")\n",
    "    description: str = Field(\n",
    "        description=\"Concise description of the general subjects to be covered in this subsection.\"\n",
    "    )\n",
    "\n",
    "\n",
    "class Subsections(BaseModel):\n",
    "    Subsections: List[Subsection] = Field(description=\"All subsections of the research paper.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "AcIUeErDlTFb"
   },
   "source": [
    "# Step 2: Set Up LLM and Tools\n",
    "We initialize gpt-3.5-turbo as our base LLM and bind it to the Subsections schema to create the orchestrator. We also load a DuckDuckGo search tool to allow worker agents to enrich sections with live web data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "TAVILIY_API_KEY = getpass.getpass(\"Tavily API Key:\")\n",
    "os.environ[\"TAVILY_API_KEY\"] = TAVILIY_API_KEY"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from langchain_community.tools.tavily_search import TavilySearchResults\n",
    "from langchain_openai import ChatOpenAI\n",
    "\n",
    "llm = ChatOpenAI(model=\"gpt-3.5-turbo\", temperature=0)\n",
    "orchestrator_llm = llm.with_structured_output(Subsections)\n",
    "\n",
    "search = TavilySearchResults(k=5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "c1R_tZc_laar"
   },
   "source": [
    "# Step 3: Define Graph State\n",
    "We define two state schemas:\n",
    "\n",
    "State holds the overall research paper workflow, including the topic, planned subsections, completed text, and final output.\n",
    "\n",
    "WorkerState captures the task assigned to each worker — a single subsection — and where their contributions are accumulated.\n",
    "\n",
    "This shared state structure lets LangGraph coordinate work between the orchestrator and its worker agents."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class State(TypedDict):\n",
    "    topic: str\n",
    "    subsections: List[Subsection]\n",
    "    completed_subsections: Annotated[List[str], operator.add]\n",
    "    final_paper: str\n",
    "    search_results: str\n",
    "\n",
    "\n",
    "class WorkerState(TypedDict):\n",
    "    subsection: Annotated[Subsection, lambda x, y: y]\n",
    "    completed_subsections: Annotated[List[str], operator.add]\n",
    "    search_results: str"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "ID2JeffkliCe"
   },
   "source": [
    "# Step 4: Define Nodes\n",
    "We define three core nodes in the graph:\n",
    "\n",
    "orchestrator: Dynamically plans the structure of the paper by generating a list of subsections using structured output.\n",
    "\n",
    "subsection_writer: Acts as a worker that writes one full subsection in academic Markdown, using the provided description and scope.\n",
    "\n",
    "synthesiser: Merges all completed subsections into a single cohesive draft, separating sections with visual dividers.\n",
    "\n",
    "Each node contributes to a modular, scalable paper-writing pipeline — and with Phoenix tracing, you can inspect every generation step in detail."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def orchestrator(state: State):\n",
    "    \"\"\"Plan the research‑paper subsections dynamically.\"\"\"\n",
    "    plan = orchestrator_llm.invoke(\n",
    "        [\n",
    "            SystemMessage(content=\"Generate a detailed subsection plan for a research paper.\"),\n",
    "            HumanMessage(content=f\"Paper topic: {state['topic']}\"),\n",
    "        ]\n",
    "    )\n",
    "    return {\"subsections\": plan.Subsections}\n",
    "\n",
    "\n",
    "def subsection_writer(state: WorkerState):\n",
    "    sub = state[\"subsection\"]\n",
    "    search_info = state.get(\"search_results\", \"\")\n",
    "\n",
    "    response = llm.invoke(\n",
    "        [\n",
    "            SystemMessage(\n",
    "                content=(\n",
    "                    \"You're writing a research-paper subsection using the following web search result as background and also your own knowledge.\"\n",
    "                )\n",
    "            ),\n",
    "            HumanMessage(\n",
    "                content=(\n",
    "                    f\"Subsection: {sub.name}\\n\"\n",
    "                    f\"Description: {sub.description}\\n\"\n",
    "                    f\"Shared Search Results:\\n{search_info}\\n\\n\"\n",
    "                    \"Now write the section.\"\n",
    "                )\n",
    "            ),\n",
    "        ]\n",
    "    )\n",
    "    return {\"completed_subsections\": [response.content]}\n",
    "\n",
    "\n",
    "def synthesiser(state: State):\n",
    "    \"\"\"Concatenate all finished subsections into the final paper draft.\"\"\"\n",
    "    full_paper = \"\\n\\n---\\n\\n\".join(state[\"completed_subsections\"])\n",
    "    return {\"final_paper\": full_paper}\n",
    "\n",
    "\n",
    "def search_tool(state: State):\n",
    "    query = f\"{state['topic']} research summary\"\n",
    "    search_results = search.invoke(query)\n",
    "    return {\"search_results\": search_results}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "tNqMCcNol8U2"
   },
   "source": [
    "# Step 5: Assign Workers Dynamically\n",
    "This function uses LangGraph’s Send API to launch a separate subsection_writer worker for each planned subsection. By dynamically spawning one worker per section, the system scales flexibly based on the topic’s complexity.\n",
    "\n",
    "This approach is ideal for research paper generation, where the number of sections is not known ahead of time — and Phoenix helps trace the output from each worker node independently."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def assign_workers(state: State):\n",
    "    \"\"\"Launch one subsection_writer per planned subsection (after shared search).\"\"\"\n",
    "    return [\n",
    "        Send(\"subsection_writer\", {\"subsection\": s, \"search_results\": state[\"search_results\"]})\n",
    "        for s in state[\"subsections\"]\n",
    "    ]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "6Ijfe_y9mFDO"
   },
   "source": [
    "# Step 6: Construct the LangGraph Workflow\n",
    "Here, we build the full LangGraph pipeline using a StateGraph. The workflow begins with the orchestrator node (to plan subsections), dynamically routes work to subsection_writer nodes (via assign_workers), and then aggregates all outputs in the synthesiser node.\n",
    "\n",
    "LangGraph’s conditional edges and Send API enable scalable parallelism — and with Phoenix tracing enabled, you can view how each section is created, tracked, and stitched together."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "builder = StateGraph(State)\n",
    "\n",
    "builder.add_node(\"orchestrator\", orchestrator)\n",
    "builder.add_node(\"search_tool\", search_tool)\n",
    "builder.add_node(\"subsection_writer\", subsection_writer)\n",
    "builder.add_node(\"synthesiser\", synthesiser)\n",
    "\n",
    "builder.add_edge(START, \"orchestrator\")\n",
    "builder.add_edge(\"orchestrator\", \"search_tool\")\n",
    "builder.add_conditional_edges(\"search_tool\", assign_workers, [\"subsection_writer\"])\n",
    "builder.add_edge(\"subsection_writer\", \"synthesiser\")\n",
    "builder.add_edge(\"synthesiser\", END)\n",
    "\n",
    "\n",
    "research_paper_workflow = builder.compile()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "y9sTPp-tmWIB"
   },
   "source": [
    "# Step 7: Run the Research Paper Generator\n",
    "We now invoke the compiled LangGraph with a sample topic: “Scaling Laws for Large Language Models.” The orchestrator plans the outline, each worker drafts a subsection in parallel, and the synthesizer assembles the full paper.\n",
    "\n",
    "With Phoenix integrated, every step is traced — from section planning to writing and synthesis — giving you full visibility into the execution flow and helping debug or refine outputs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "research_topics = [\n",
    "    \"How do scaling laws impact the performance of large language models?\",\n",
    "    \"What are the key challenges in training very large transformer models?\",\n",
    "    \"How much data is needed to train a performant LLM?\",\n",
    "    \"Explain the relationship between model size and accuracy in language models.\",\n",
    "    \"Why are modern language models undertrained, and how can we fix it?\",\n",
    "    \"What is compute-optimal training for LLMs?\",\n",
    "    \"Compare different scaling strategies for training foundation models.\",\n",
    "    \"How do researchers determine the best size for a transformer model?\",\n",
    "    \"What are the trade-offs between training time and model performance?\",\n",
    "    \"Summarize recent findings on training efficiency for large-scale language models.\",\n",
    "]\n",
    "\n",
    "for topic in research_topics:\n",
    "    state = research_paper_workflow.invoke({\"topic\": topic})\n",
    "\n",
    "print(\"===== RESEARCH PAPER DRAFT =====\\n\")\n",
    "Markdown(state[\"final_paper\"])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "mkfIm0uSmdpa"
   },
   "source": [
    "# Step 8: Check out your traces in Phoenix!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "Nlo8OLX_fdgM"
   },
   "source": [
    "# Let's add some Evaluations (Evals)\n",
    "\n",
    "In this section we will evaluate Agent Path Convergence.\n",
    "\n",
    "**avg(minimum steps taken for this query / steps in the run)**\n",
    "\n",
    "This helps compute the consistency of your orchestrator, across similar queries.\n",
    "\n",
    "See https://arize.com/docs/phoenix/evaluation/how-to-evals/running-pre-tested-evals/agent-path-convergence"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from phoenix.client import Client\n",
    "from phoenix.client.types.spans import SpanQuery\n",
    "\n",
    "client = Client()\n",
    "df = client.spans.get_spans_dataframe(\n",
    "    query=SpanQuery().where(\"name == 'LangGraph'\"), project_identifier=\"Orchestrator\"\n",
    ")\n",
    "df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "optimal_path_length = 7  # adjust this for your use case"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "5S9hY7027_96"
   },
   "source": [
    "## Generate scores"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "\n",
    "all_steps = []\n",
    "for row in df[\"attributes.output.value\"]:\n",
    "    data = json.loads(row)\n",
    "    num_subsections = len(data.get(\"subsections\", []))\n",
    "    all_steps.append(num_subsections)\n",
    "\n",
    "convergences = []\n",
    "optimal = min(all_steps)\n",
    "\n",
    "ratios = [optimal / step for step in all_steps]\n",
    "\n",
    "df[\"score\"] = ratios\n",
    "df[\"explanation\"] = [\"Minimum path length / this path length\"] * 11"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "HSU5rmte8EHL"
   },
   "source": [
    "# View your Evals in Phoenix\n",
    "\n",
    "At the top of your traces you will see a score under \"Agent Path Convergence\". That is the average of the scores we computed and should serve as your final metric for this evaluation."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from phoenix.client import AsyncClient\n",
    "\n",
    "px_client = AsyncClient()\n",
    "await px_client.spans.log_span_annotations_dataframe(\n",
    "    dataframe=df,\n",
    "    annotation_name=\"Agent Path Convergence\",\n",
    "    annotator_kind=\"LLM\",\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "language_info": {
   "name": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
